#!/usr/bin/env python
#
# Copyright (C) 2014 Inktank <info@inktank.com>
# Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
#
# Author: Loic Dachary <loic@dachary.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Library Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library Public License for more details.
#

import argparse
import errno
import fcntl
import logging
import os
import os.path
import platform
import re
import subprocess
import stat
import sys
import tempfile
import uuid

"""
Prepare:
 - create GPT partition
 - mark the partition with the ceph type uuid
 - create a file system
 - mark the fs as ready for ceph consumption
 - entire data disk is used (one big partition)
 - a new partition is added to the journal disk (so it can be easily shared)

 - triggered by administrator or ceph-deploy, e.g.  'ceph-disk <data disk> [journal disk]

Activate:
 - mount the volume in a temp location
 - allocate an osd id (if needed)
 - remount in the correct location /var/lib/ceph/osd/$cluster-$id
 - start ceph-osd

 - triggered by udev when it sees the OSD gpt partition type
 - triggered by admin 'ceph-disk activate <path>'
 - triggered on ceph service startup with 'ceph-disk activate-all'

We rely on /dev/disk/by-partuuid to find partitions by their UUID;
this is what the journal symlink inside the osd data volume normally
points to.

activate-all relies on /dev/disk/by-parttype-uuid/$typeuuid.$uuid to
find all partitions.  We install special udev rules to create these
links.

udev triggers 'ceph-disk activate <dev>' or 'ceph-disk
activate-journal <dev>' based on the partition type.

On old distros (e.g., RHEL6), the blkid installed does not recognized
GPT partition metadata and the /dev/disk/by-partuuid etc. links aren't
present.  We have a horrible hack in the form of ceph-disk-udev that
parses gparted output to create the symlinks above and triggers the
'ceph-disk activate' etc commands that udev normally would do if it
knew the GPT partition type.

"""

CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'

JOURNAL_UUID =         '45b0969e-9b03-4f30-b4c6-b4b80ceff106'
DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106'
OSD_UUID =             '4fbd7e29-9d25-41b8-afd0-062c0ceff05d'
DMCRYPT_OSD_UUID =     '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d'
TOBE_UUID =            '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be'
DMCRYPT_TOBE_UUID =    '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be'

DEFAULT_FS_TYPE = 'xfs'

MOUNT_OPTIONS = dict(
    btrfs='noatime,user_subvol_rm_allowed',
    # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll
    # delay a moment before removing it fully because we did have some
    # issues with ext4 before the xatts-in-leveldb work, and it seemed
    # that user_xattr helped
    ext4='noatime,user_xattr',
    xfs='noatime,inode64',
    )

MKFS_ARGS = dict(
    btrfs=[
        '-m', 'single',
        '-l', '32768',
        '-n', '32768',
        ],
    xfs=[
        # xfs insists on not overwriting previous fs; even if we wipe
        # partition table, we often recreate it exactly the same way,
        # so we'll see ghosts of filesystems past
        '-f',
        '-i', 'size=2048',
        ],
    )

INIT_SYSTEMS = [
    'upstart',
    'sysvinit',
    'systemd',
    'auto',
    'none',
    ]

STATEDIR = '/var/lib/ceph'

SYSCONFDIR = '/etc/ceph'

# only warn once about some things
warned_about = {}

# Nuke the TERM variable to avoid confusing any subprocesses we call.
# For example, libreadline will print weird control sequences for some
# TERM values.
if 'TERM' in os.environ:
    del os.environ['TERM']

LOG_NAME = __name__
if LOG_NAME == '__main__':
    LOG_NAME = os.path.basename(sys.argv[0])
LOG = logging.getLogger(LOG_NAME)


###### lock ########

class filelock(object):
    def __init__(self, fn):
        self.fn = fn
        self.fd = None

    def acquire(self):
        assert not self.fd
        self.fd = file(self.fn, 'w')
        fcntl.lockf(self.fd, fcntl.LOCK_EX)

    def release(self):
        assert self.fd
        fcntl.lockf(self.fd, fcntl.LOCK_UN)
        self.fd = None


###### exceptions ########


class Error(Exception):
    """
    Error
    """

    def __str__(self):
        doc = self.__doc__.strip()
        return ': '.join([doc] + [str(a) for a in self.args])


class MountError(Error):
    """
    Mounting filesystem failed
    """


class UnmountError(Error):
    """
    Unmounting filesystem failed
    """


class BadMagicError(Error):
    """
    Does not look like a Ceph OSD, or incompatible version
    """


class TruncatedLineError(Error):
    """
    Line is truncated
    """


class TooManyLinesError(Error):
    """
    Too many lines
    """


class FilesystemTypeError(Error):
    """
    Cannot discover filesystem type
     """


class CephDiskException(Exception):
    """
    A base exception for ceph-disk to provide custom (ad-hoc) messages that
    will be caught and dealt with when main() is executed
    """
    pass


class ExecutableNotFound(CephDiskException):
    """
    Exception to report on executables not available in PATH
    """
    pass


####### utils


def maybe_mkdir(*a, **kw):
    """
    Creates a new directory if it doesn't exist, removes
    existing symlink before creating the directory.
    """
    # remove any symlink, if it is there..
    if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode):
        LOG.debug('Removing old symlink at %s', *a)
        os.unlink(*a)
    try:
        os.mkdir(*a, **kw)
    except OSError, e:
        if e.errno == errno.EEXIST:
            pass
        else:
            raise


def which(executable):
    """find the location of an executable"""
    if 'PATH' in os.environ:
        envpath = os.environ['PATH']
    else:
        envpath = os.defpath
    PATH = envpath.split(os.pathsep)

    locations = PATH + [
        '/usr/local/bin',
        '/bin',
        '/usr/bin',
        '/usr/local/sbin',
        '/usr/sbin',
        '/sbin',
    ]

    for location in locations:
        executable_path = os.path.join(location, executable)
        if os.path.exists(executable_path):
            return executable_path


def _get_command_executable(arguments):
    """
    Return the full path for an executable, raise if the executable is not
    found. If the executable has already a full path do not perform any checks.
    """
    if arguments[0].startswith('/'):  # an absolute path
        return arguments
    executable = which(arguments[0])
    if not executable:
        command_msg = 'Could not run command: %s' % ' '.join(arguments)
        executable_msg = '%s not in path.' % arguments[0]
        raise ExecutableNotFound('%s %s' % (executable_msg, command_msg))

    # swap the old executable for the new one
    arguments[0] = executable
    return arguments


def command(arguments, **kwargs):
    """
    Safely execute a ``subprocess.Popen`` call making sure that the
    executable exists and raising a helpful error message
    if it does not.

    .. note:: This should be the prefered way of calling ``subprocess.Popen``
    since it provides the caller with the safety net of making sure that
    executables *will* be found and will error nicely otherwise.

    This returns the output of the command and the return code of the
    process in a tuple: (output, returncode).
    """
    arguments = _get_command_executable(arguments)
    LOG.info('Running command: %s' % ' '.join(arguments))
    process = subprocess.Popen(
        arguments,
        stdout=subprocess.PIPE,
        **kwargs)
    out, _ = process.communicate()
    return out, process.returncode


def command_check_call(arguments):
    """
    Safely execute a ``subprocess.check_call`` call making sure that the
    executable exists and raising a helpful error message if it does not.

    .. note:: This should be the prefered way of calling
    ``subprocess.check_call`` since it provides the caller with the safety net
    of making sure that executables *will* be found and will error nicely
    otherwise.
    """
    arguments = _get_command_executable(arguments)
    LOG.info('Running command: %s', ' '.join(arguments))
    return subprocess.check_call(arguments)


def platform_distro():
    """
    Returns a normalized, lower case string without any leading nor trailing
    whitespace that represents the distribution name of the current machine.
    """
    distro = platform_information()[0] or ''
    return distro.strip().lower()


def platform_information():
    distro, release, codename = platform.linux_distribution()
    if not codename and 'debian' in distro.lower():  # this could be an empty string in Debian
        debian_codenames = {
            '8': 'jessie',
            '7': 'wheezy',
            '6': 'squeeze',
        }
        major_version = release.split('.')[0]
        codename = debian_codenames.get(major_version, '')

        # In order to support newer jessie/sid or wheezy/sid strings we test this
        # if sid is buried in the minor, we should use sid anyway.
        if not codename and '/' in release:
            major, minor = release.split('/')
            if minor == 'sid':
                codename = minor
            else:
                codename = major

    return (
        str(distro).strip(),
        str(release).strip(),
        str(codename).strip()
    )


def get_dev_name(path):
    """
    get device name from path.  e.g.::

        /dev/sda -> sdas, /dev/cciss/c0d1 -> cciss!c0d1

    a device "name" is something like::

        sdb
        cciss!c0d1

    """
    assert path.startswith('/dev/')
    base = path[5:]
    return base.replace('/', '!')


def get_dev_path(name):
    """
    get a path (/dev/...) from a name (cciss!c0d1)
    a device "path" is something like::

        /dev/sdb
        /dev/cciss/c0d1

    """
    return '/dev/' + name.replace('!', '/')


def get_dev_relpath(name):
    """
    get a relative path to /dev from a name (cciss!c0d1)
    """
    return name.replace('!', '/')


def get_dev_size(dev, size='megabytes'):
    """
    Attempt to get the size of a device so that we can prevent errors
    from actions to devices that are smaller, and improve error reporting.

    Because we want to avoid breakage in case this approach is not robust, we
    will issue a warning if we failed to get the size.

    :param size: bytes or megabytes
    :param dev: the device to calculate the size
    """
    fd = os.open(dev, os.O_RDONLY)
    dividers = {'bytes': 1, 'megabytes': 1024*1024}
    try:
        device_size = os.lseek(fd, 0, os.SEEK_END)
        divider = dividers.get(size, 1024*1024)  # default to megabytes
        return device_size/divider
    except Exception as error:
        LOG.warning('failed to get size of %s: %s' % (dev, str(error)))
    finally:
        os.close(fd)


def get_partition_dev(dev, pnum):
    """
    get the device name for a partition

    assume that partitions are named like the base dev, with a number, and optionally
    some intervening characters (like 'p').  e.g.,

       sda 1 -> sda1
       cciss/c0d1 1 -> cciss!c0d1p1
    """
    name = get_dev_name(os.path.realpath(dev))
    partname = None
    for f in os.listdir(os.path.join('/sys/block', name)):
        if f.startswith(name) and f.endswith(str(pnum)):
            # we want the shortest name that starts with the base name and ends with the partition number
            if not partname or len(f) < len(partname):
                partname = f
    if partname:
        return get_dev_path(partname)
    else:
        raise Error('partition %d for %s does not appear to exist' % (pnum, dev))


def list_all_partitions():
    """
    Return a list of devices and partitions
    """
    dev_part_list = {}
    for name in os.listdir('/sys/block'):
        # /dev/fd0 may hang http://tracker.ceph.com/issues/6827
        if re.match(r'^fd\d$', name):
            continue
        if not os.path.exists(os.path.join('/sys/block', name, 'device')):
            continue
        dev_part_list[name] = list_partitions(name)
    return dev_part_list


def list_partitions(basename):
    """
    Return a list of partitions on the given device name
    """
    partitions = []
    for name in os.listdir(os.path.join('/sys/block', basename)):
        if name.startswith(basename):
            partitions.append(name)
    return partitions

def get_partition_base(dev):
    """
    Get the base device for a partition
    """
    dev = os.path.realpath(dev)
    if not stat.S_ISBLK(os.lstat(dev).st_mode):
        raise Error('not a block device', dev)

    name = get_dev_name(dev)
    if os.path.exists(os.path.join('/sys/block', name)):
        raise Error('not a partition', dev)

    # find the base
    for basename in os.listdir('/sys/block'):
        if os.path.exists(os.path.join('/sys/block', basename, name)):
            return '/dev/' + basename
    raise Error('no parent device for partition', dev)

def is_partition(dev):
    """
    Check whether a given device path is a partition or a full disk.
    """
    dev = os.path.realpath(dev)
    if not stat.S_ISBLK(os.lstat(dev).st_mode):
        raise Error('not a block device', dev)

    name = get_dev_name(dev)
    if os.path.exists(os.path.join('/sys/block', name)):
        return False

    # make sure it is a partition of something else
    for basename in os.listdir('/sys/block'):
        if os.path.exists(os.path.join('/sys/block', basename, name)):
            return True

    raise Error('not a disk or partition', dev)


def is_mounted(dev):
    """
    Check if the given device is mounted.
    """
    dev = os.path.realpath(dev)
    with file('/proc/mounts', 'rb') as proc_mounts:
        for line in proc_mounts:
            fields = line.split()
            if len(fields) < 3:
                continue
            mounts_dev = fields[0]
            path = fields[1]
            if mounts_dev.startswith('/') and os.path.exists(mounts_dev):
                mounts_dev = os.path.realpath(mounts_dev)
                if mounts_dev == dev:
                    return path
    return None


def is_held(dev):
    """
    Check if a device is held by another device (e.g., a dm-crypt mapping)
    """
    assert os.path.exists(dev)
    dev = os.path.realpath(dev)
    base = get_dev_name(dev)

    # full disk?
    directory = '/sys/block/{base}/holders'.format(base=base)
    if os.path.exists(directory):
        return os.listdir(directory)

    # partition?
    part = base
    while len(base):
        directory = '/sys/block/{base}/{part}/holders'.format(part=part, base=base)
        if os.path.exists(directory):
            return os.listdir(directory)
        base = base[:-1]
    return []


def verify_not_in_use(dev, check_partitions=False):
    """
    Verify if a given device (path) is in use (e.g. mounted or
    in use by device-mapper).

    :raises: Error if device is in use.
    """
    assert os.path.exists(dev)
    if is_mounted(dev):
        raise Error('Device is mounted', dev)
    holders = is_held(dev)
    if holders:
        raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders))

    if check_partitions and not is_partition(dev):
        basename = get_dev_name(os.path.realpath(dev))
        for partname in list_partitions(basename):
            partition = get_dev_path(partname)
            if is_mounted(partition):
                raise Error('Device is mounted', partition)
            holders = is_held(partition)
            if holders:
                raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders))


def must_be_one_line(line):
    """
    Checks if given line is really one single line.

    :raises: TruncatedLineError or TooManyLinesError
    :return: Content of the line, or None if line isn't valid.
    """
    if line[-1:] != '\n':
        raise TruncatedLineError(line)
    line = line[:-1]
    if '\n' in line:
        raise TooManyLinesError(line)
    return line


def read_one_line(parent, name):
    """
    Read a file whose sole contents are a single line.

    Strips the newline.

    :return: Contents of the line, or None if file did not exist.
    """
    path = os.path.join(parent, name)
    try:
        line = file(path, 'rb').read()
    except IOError as e:
        if e.errno == errno.ENOENT:
            return None
        else:
            raise

    try:
        line = must_be_one_line(line)
    except (TruncatedLineError, TooManyLinesError) as e:
        raise Error(
            'File is corrupt: {path}: {msg}'.format(
                path=path,
                msg=e,
            )
        )
    return line


def write_one_line(parent, name, text):
    """
    Write a file whose sole contents are a single line.

    Adds a newline.
    """
    path = os.path.join(parent, name)
    tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
    with file(tmp, 'wb') as tmp_file:
        tmp_file.write(text + '\n')
        os.fsync(tmp_file.fileno())
    os.rename(tmp, path)


def check_osd_magic(path):
    """
    Check that this path has the Ceph OSD magic.

    :raises: BadMagicError if this does not look like a Ceph OSD data
    dir.
    """
    magic = read_one_line(path, 'magic')
    if magic is None:
        # probably not mkfs'ed yet
        raise BadMagicError(path)
    if magic != CEPH_OSD_ONDISK_MAGIC:
        raise BadMagicError(path)


def check_osd_id(osd_id):
    """
    Ensures osd id is numeric.
    """
    if not re.match(r'^[0-9]+$', osd_id):
        raise Error('osd id is not numeric', osd_id)


def allocate_osd_id(
    cluster,
    fsid,
    keyring,
    ):
    """
    Accocates an OSD id on the given cluster.

    :raises: Error if the call to allocate the OSD id fails.
    :return: The allocated OSD id.
    """

    LOG.debug('Allocating OSD id...')
    try:
        osd_id = _check_output(
            args=[
                'ceph',
                '--cluster', cluster,
                '--name', 'client.bootstrap-osd',
                '--keyring', keyring,
                'osd', 'create', '--concise',
                fsid,
                ],
            )
    except subprocess.CalledProcessError as e:
        raise Error('ceph osd create failed', e, e.output)
    osd_id = must_be_one_line(osd_id)
    check_osd_id(osd_id)
    return osd_id


def get_osd_id(path):
    """
    Gets the OSD id of the OSD at the given path.
    """
    osd_id = read_one_line(path, 'whoami')
    if osd_id is not None:
        check_osd_id(osd_id)
    return osd_id


def _check_output(args=None, **kwargs):
    out, ret = command(args, **kwargs)
    if ret:
        cmd = args[0]
        error = subprocess.CalledProcessError(ret, cmd)
        error.output = out
        raise error
    return out


def get_conf(cluster, variable):
    """
    Get the value of the given configuration variable from the
    cluster.

    :raises: Error if call to ceph-conf fails.
    :return: The variable value or None.
    """
    try:
        out, ret = command(
            [
                'ceph-conf',
                '--cluster={cluster}'.format(
                    cluster=cluster,
                    ),
                '--name=osd.',
                '--lookup',
                variable,
                ],
            close_fds=True,
            )
    except OSError as e:
        raise Error('error executing ceph-conf', e)
    if ret == 1:
        # config entry not found
        return None
    elif ret != 0:
        raise Error('getting variable from configuration failed')
    value = out.split('\n', 1)[0]
    # don't differentiate between "var=" and no var set
    if not value:
        return None
    return value


def get_conf_with_default(cluster, variable):
    """
    Get a config value that is known to the C++ code.

    This will fail if called on variables that are not defined in
    common config options.
    """
    try:
        out = _check_output(
            args=[
                'ceph-osd',
                '--cluster={cluster}'.format(
                    cluster=cluster,
                    ),
                '--show-config-value={variable}'.format(
                    variable=variable,
                    ),
                ],
            close_fds=True,
            )
    except subprocess.CalledProcessError as e:
        raise Error(
            'getting variable from configuration failed',
            e,
            )

    value = str(out).split('\n', 1)[0]
    return value


def get_fsid(cluster):
    """
    Get the fsid of the cluster.

    :return: The fsid or raises Error.
    """
    fsid = get_conf_with_default(cluster=cluster, variable='fsid')
    if fsid is None:
        raise Error('getting cluster uuid from configuration failed')
    return fsid.lower()


def get_or_create_dmcrypt_key(
    _uuid,
    key_dir,
    ):
    """
    Get path to dmcrypt key or create a new key file.

    :return: Path to the dmcrypt key file.
    """
    path = os.path.join(key_dir, _uuid)

    # already have it?
    if os.path.exists(path):
        return path

    # make a new key
    try:
        if not os.path.exists(key_dir):
            os.makedirs(key_dir)
        with file('/dev/urandom', 'rb') as i:
            key = i.read(256)
            with file(path, 'wb') as key_file:
                key_file.write(key)
        return path
    except:
        raise Error('unable to read or create dm-crypt key', path)


def dmcrypt_map(
    rawdev,
    keypath,
    _uuid,
    ):
    """
    Maps a device to a dmcrypt device.

    :return: Path to the dmcrypt device.
    """
    dev = '/dev/mapper/' + _uuid
    args = [
        'cryptsetup',
        '--key-file',
        keypath,
        '--key-size', '256',
        'create',
        _uuid,
        rawdev,
        ]
    try:
        command_check_call(args)
        return dev

    except subprocess.CalledProcessError as e:
        raise Error('unable to map device', rawdev, e)


def dmcrypt_unmap(
    _uuid
    ):
    """
    Removes the dmcrypt device with the given UUID.
    """
    args = [
        'cryptsetup',
        'remove',
        _uuid
    ]

    try:
        command_check_call(args)

    except subprocess.CalledProcessError as e:
        raise Error('unable to unmap device', _uuid, e)


def mount(
    dev,
    fstype,
    options,
    ):
    """
    Mounts a device with given filessystem type and
    mount options to a tempfile path under /var/lib/ceph/tmp.
    """
    # sanity check: none of the arguments are None
    if dev is None:
        raise ValueError('dev may not be None')
    if fstype is None:
        raise ValueError('fstype may not be None')

    # pick best-of-breed mount options based on fs type
    if options is None:
        options = MOUNT_OPTIONS.get(fstype, '')

    # mount
    path = tempfile.mkdtemp(
        prefix='mnt.',
        dir=STATEDIR + '/tmp',
        )
    try:
        LOG.debug('Mounting %s on %s with options %s', dev, path, options)
        command_check_call(
            [
                'mount',
                '-t', fstype,
                '-o', options,
                '--',
                dev,
                path,
                ],
            )
    except subprocess.CalledProcessError as e:
        try:
            os.rmdir(path)
        except (OSError, IOError):
            pass
        raise MountError(e)

    return path


def unmount(
    path,
    ):
    """
    Unmount and removes the given mount point.
    """
    try:
        LOG.debug('Unmounting %s', path)
        command_check_call(
            [
                '/bin/umount',
                '--',
                path,
                ],
            )
    except subprocess.CalledProcessError as e:
        raise UnmountError(e)

    os.rmdir(path)


###########################################


def get_free_partition_index(dev):
    """
    Get the next free partition index on a given device.

    :return: Index number (> 1 if there is already a partition on the device)
    or 1 if there is no partition table.
    """
    try:
        lines = _check_output(
            args=[
                'parted',
                '--machine',
                '--',
                dev,
                'print',
                ],
            )
    except subprocess.CalledProcessError as e:
        print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e
        return 1

    if not lines:
        raise Error('parted failed to output anything')
    lines = str(lines).splitlines(True)

    # work around buggy libreadline(?) library in rhel/centos.
    idiot_prefix = '\x1b\x5b\x3f\x31\x30\x33\x34\x68'
    if lines[0].startswith(idiot_prefix):
        lines[0] = lines[0][8:]

    if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']:
        raise Error('weird parted units', lines[0])
    del lines[0]

    if not lines[0].startswith('/dev/'):
        raise Error('weird parted disk entry', lines[0])
    del lines[0]

    seen = set()
    for line in lines:
        idx, _ = line.split(':', 1)
        idx = int(idx)
        seen.add(idx)

    num = 1
    while num in seen:
        num += 1
    return num


def zap(dev):
    """
    Destroy the partition table and content of a given disk.
    """
    try:
        LOG.debug('Zapping partition table on %s', dev)

        # try to wipe out any GPT partition table backups.  sgdisk
        # isn't too thorough.
        lba_size = 4096
        size = 33 * lba_size
        with file(dev, 'wb') as dev_file:
            dev_file.seek(-size, os.SEEK_END)
            dev_file.write(size*'\0')

        command_check_call(
            [
                'sgdisk',
                '--zap-all',
                '--clear',
                '--mbrtogpt',
                '--',
                dev,
            ],
        )
    except subprocess.CalledProcessError as e:
        raise Error(e)


def prepare_journal_dev(
    data,
    journal,
    journal_size,
    journal_uuid,
    journal_dm_keypath,
    ):

    reusing_partition = False

    if is_partition(journal):
        LOG.debug('Journal %s is a partition', journal)
        LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
        if get_partition_type(journal) == JOURNAL_UUID:
            LOG.debug('Journal %s was previously prepared with ceph-disk. Reusing it.', journal)
            reusing_partition = True
            base = get_partition_base(journal)
            part = journal.replace(base,'')
            journal = base # needed for later
        else:
            LOG.warning('Journal %s was not prepared with ceph-disk. Symlinking directly.', journal)
            return (journal, None, None)

    ptype = JOURNAL_UUID
    if journal_dm_keypath:
        ptype = DMCRYPT_JOURNAL_UUID

    # it is a whole disk.  create a partition!
    num = None
    if journal == data and not reusing_partition:
        # we're sharing the disk between osd data and journal;
        # make journal be partition number 2, so it's pretty
        num = 2
        journal_part = '{num}:0:{size}M'.format(
            num=num,
            size=journal_size,
            )
    elif reusing_partition:
        num = int(part)
        journal_part = '' # not used in this case
    else:
        # sgdisk has no way for me to say "whatever is the next
        # free index number" when setting type guids etc, so we
        # need to awkwardly look up the next free number, and then
        # fix that in the call -- and hope nobody races with us;
        # then again nothing guards the partition table from races
        # anyway
        num = get_free_partition_index(dev=journal)
        journal_part = '{num}:0:+{size}M'.format(
            num=num,
            size=journal_size,
            )
        LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')

    if reusing_partition:
        dev_size = get_dev_size(base+part)
    else:
        dev_size = get_dev_size(journal)

    if journal_size > dev_size:
        LOG.error('refusing to create journal on %s' % journal)
        LOG.error('journal size (%sM) is bigger than device (%sM)' % (journal_size, dev_size))
        raise Error(
            '%s device size (%sM) is not big enough for journal' % (journal, dev_size)
        )

    try:
        sgdisk_call = [
                'sgdisk',
                '--new={part}'.format(part=journal_part),
                '--change-name={num}:ceph journal'.format(num=num),
                '--partition-guid={num}:{journal_uuid}'.format(
                    num=num,
                    journal_uuid=journal_uuid,
                    ),
                '--typecode={num}:{uuid}'.format(
                    num=num,
                    uuid=ptype,
                    ),
                '--mbrtogpt',
                '--',
                journal,
            ]
        if reusing_partition:
            action= 'Reusing'
            del sgdisk_call[1] # don't add --new when reusing
        else:
            action = 'Creating'
        LOG.debug('%s journal partition num %d size %d on %s', action, num, journal_size, journal)
        command_check_call(sgdisk_call)

        # try to make sure the kernel refreshes the table.  note
        # that if this gets ebusy, we are probably racing with
        # udev because it already updated it.. ignore failure here.

        # On RHEL and CentOS distros, calling partprobe forces a reboot of the
        # server. Since we are not resizing partitons so we rely on calling
        # partx
        if platform_distro().startswith(('centos', 'red', 'scientific')):
            LOG.info('calling partx on prepared device %s', journal)
            LOG.info('re-reading known partitions will display errors')
            command(
                [
                    'partx',
                    '-a',
                    journal,
                    ],
                )

        else:
            LOG.debug('Calling partprobe on prepared device %s', journal)
            command(
                [
                    'partprobe',
                    journal,
                    ],
                )

        # wait for udev event queue to clear
        command(
            [
                'udevadm',
                'settle',
                ],
            )

        journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format(
            journal_uuid=journal_uuid,
            )

        journal_dmcrypt = None
        if journal_dm_keypath:
            journal_dmcrypt = journal_symlink
            journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid)

        LOG.debug('Journal is GPT partition %s', journal_symlink)
        return (journal_symlink, journal_dmcrypt, journal_uuid)

    except subprocess.CalledProcessError as e:
        raise Error(e)


def prepare_journal_file(
    journal):

    if not os.path.exists(journal):
        LOG.debug('Creating journal file %s with size 0 (ceph-osd will resize and allocate)', journal)
        with file(journal, 'wb') as journal_file:  # noqa
            pass

    LOG.debug('Journal is file %s', journal)
    LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
    return (journal, None, None)


def prepare_journal(
    data,
    journal,
    journal_size,
    journal_uuid,
    force_file,
    force_dev,
    journal_dm_keypath,
    ):

    if journal is None:
        if force_dev:
            raise Error('Journal is unspecified; not a block device')
        return (None, None, None)

    if not os.path.exists(journal):
        if force_dev:
            raise Error('Journal does not exist; not a block device', journal)
        return prepare_journal_file(journal)

    jmode = os.stat(journal).st_mode
    if stat.S_ISREG(jmode):
        if force_dev:
            raise Error('Journal is not a block device', journal)
        return prepare_journal_file(journal)

    if stat.S_ISBLK(jmode):
        if force_file:
            raise Error('Journal is not a regular file', journal)
        return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath)

    raise Error('Journal %s is neither a block device nor regular file' % journal)


def adjust_symlink(target, path):
    create = True
    if os.path.lexists(path):
        try:
            mode = os.lstat(path).st_mode
            if stat.S_ISREG(mode):
                LOG.debug('Removing old file %s', path)
                os.unlink(path)
            elif stat.S_ISLNK(mode):
                old = os.readlink(path)
                if old != target:
                    LOG.debug('Removing old symlink %s -> %s', path, old)
                    os.unlink(path)
                else:
                    create = False
        except:
            raise Error('unable to remove (or adjust) old file (symlink)', path)
    if create:
        LOG.debug('Creating symlink %s -> %s', path, target)
        try:
            os.symlink(target, path)
        except:
            raise Error('unable to create symlink %s -> %s' % (path, target))


def prepare_dir(
    path,
    journal,
    cluster_uuid,
    osd_uuid,
    journal_uuid,
    journal_dmcrypt=None,
    ):

    if os.path.exists(os.path.join(path, 'magic')):
        LOG.debug('Data dir %s already exists', path)
        return
    else:
        LOG.debug('Preparing osd data dir %s', path)

    if osd_uuid is None:
        osd_uuid = str(uuid.uuid4())

    if journal is not None:
        # we're using an external journal; point to it here
        adjust_symlink(journal, os.path.join(path, 'journal'))

    if journal_dmcrypt is not None:
        adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt'))
    else:
        try:
            os.unlink(os.path.join(path, 'journal_dmcrypt'))
        except OSError:
            pass

    write_one_line(path, 'ceph_fsid', cluster_uuid)
    write_one_line(path, 'fsid', osd_uuid)

    if journal_uuid is not None:
        # i.e., journal is a tagged partition
        write_one_line(path, 'journal_uuid', journal_uuid)

    write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC)


def prepare_dev(
    data,
    journal,
    fstype,
    mkfs_args,
    mount_options,
    cluster_uuid,
    osd_uuid,
    journal_uuid,
    journal_dmcrypt,
    osd_dm_keypath,
    ):
    """
    Prepare a data/journal combination to be used for an OSD.

    The ``magic`` file is written last, so it's presence is a reliable
    indicator of the whole sequence having completed.

    WARNING: This will unconditionally overwrite anything given to
    it.
    """

    ptype_tobe = TOBE_UUID
    ptype_osd = OSD_UUID
    if osd_dm_keypath:
        ptype_tobe = DMCRYPT_TOBE_UUID
        ptype_osd = DMCRYPT_OSD_UUID

    rawdev = None
    if is_partition(data):
        LOG.debug('OSD data device %s is a partition', data)
        rawdev = data
    else:
        LOG.debug('Creating osd partition on %s', data)
        try:
            command_check_call(
                [
                    'sgdisk',
                    '--largest-new=1',
                    '--change-name=1:ceph data',
                    '--partition-guid=1:{osd_uuid}'.format(
                        osd_uuid=osd_uuid,
                        ),
                    '--typecode=1:%s' % ptype_tobe,
                    '--',
                    data,
                ],
            )
            command(
                [
                    'partprobe',
                    data,
                    ],
                )
            command(
                [
                    # wait for udev event queue to clear
                    'udevadm',
                    'settle',
                    ],
                )
        except subprocess.CalledProcessError as e:
            raise Error(e)

        rawdev = get_partition_dev(data, 1)

    dev = None
    if osd_dm_keypath:
        dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid)
    else:
        dev = rawdev

    try:
        args = [
            'mkfs',
            '-t',
            fstype,
            ]
        if mkfs_args is not None:
            args.extend(mkfs_args.split())
            if fstype == 'xfs':
                args.extend(['-f'])  # always force
        else:
            args.extend(MKFS_ARGS.get(fstype, []))
        args.extend([
            '--',
            dev,
            ])
        try:
            LOG.debug('Creating %s fs on %s', fstype, dev)
            command_check_call(args)
        except subprocess.CalledProcessError as e:
            raise Error(e)

        #remove whitespaces from mount_options
        if mount_options is not None:
            mount_options = "".join(mount_options.split())

        path = mount(dev=dev, fstype=fstype, options=mount_options)

        try:
            prepare_dir(
                path=path,
                journal=journal,
                cluster_uuid=cluster_uuid,
                osd_uuid=osd_uuid,
                journal_uuid=journal_uuid,
                journal_dmcrypt=journal_dmcrypt,
                )
        finally:
            unmount(path)
    finally:
        if rawdev != dev:
            dmcrypt_unmap(osd_uuid)

    if not is_partition(data):
        try:
            command_check_call(
                [
                    'sgdisk',
                    '--typecode=1:%s' % ptype_osd,
                    '--',
                    data,
                ],
            )
        except subprocess.CalledProcessError as e:
            raise Error(e)


def main_prepare(args):
    journal_dm_keypath = None
    osd_dm_keypath = None

    try:
        prepare_lock.acquire()  # noqa
        if not os.path.exists(args.data):
            if args.data_dev:
                raise Error('data path for device does not exist', args.data)
            if args.data_dir:
                raise Error('data path for directory does not exist', args.data)
            raise Error('data path does not exist', args.data)

        # in use?
        dmode = os.stat(args.data).st_mode
        if stat.S_ISBLK(dmode):
            verify_not_in_use(args.data, True)

        if args.journal and os.path.exists(args.journal):
            jmode = os.stat(args.journal).st_mode
            if stat.S_ISBLK(jmode):
                verify_not_in_use(args.journal, False)

        if args.zap_disk is not None:
            if stat.S_ISBLK(dmode) and not is_partition(args.data):
                zap(args.data)
            else:
                raise Error('not full block device; cannot zap', args.data)

        if args.cluster_uuid is None:
            args.cluster_uuid = get_fsid(cluster=args.cluster)
            if args.cluster_uuid is None:
                raise Error(
                    'must have fsid in config or pass --cluster-uuid=',
                    )

        if args.fs_type is None:
            args.fs_type = get_conf(
                cluster=args.cluster,
                variable='osd_mkfs_type',
                )
            if args.fs_type is None:
                args.fs_type = get_conf(
                    cluster=args.cluster,
                    variable='osd_fs_type',
                    )
            if args.fs_type is None:
                args.fs_type = DEFAULT_FS_TYPE

        mkfs_args = get_conf(
            cluster=args.cluster,
            variable='osd_mkfs_options_{fstype}'.format(
                fstype=args.fs_type,
                ),
            )
        if mkfs_args is None:
            mkfs_args = get_conf(
                cluster=args.cluster,
                variable='osd_fs_mkfs_options_{fstype}'.format(
                    fstype=args.fs_type,
                    ),
                )

        mount_options = get_conf(
            cluster=args.cluster,
            variable='osd_mount_options_{fstype}'.format(
                fstype=args.fs_type,
                ),
            )
        if mount_options is None:
            mount_options = get_conf(
                cluster=args.cluster,
                variable='osd_fs_mount_options_{fstype}'.format(
                    fstype=args.fs_type,
                    ),
                )

        journal_size = get_conf_with_default(
            cluster=args.cluster,
            variable='osd_journal_size',
            )
        journal_size = int(journal_size)

        # colocate journal with data?
        if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None:
            LOG.info('Will colocate journal with data on %s', args.data)
            args.journal = args.data

        if args.journal_uuid is None:
            args.journal_uuid = str(uuid.uuid4())
        if args.osd_uuid is None:
            args.osd_uuid = str(uuid.uuid4())

        # dm-crypt keys?
        if args.dmcrypt:
            journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir)
            osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir)

        # prepare journal
        (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal(
            data=args.data,
            journal=args.journal,
            journal_size=journal_size,
            journal_uuid=args.journal_uuid,
            force_file=args.journal_file,
            force_dev=args.journal_dev,
            journal_dm_keypath=journal_dm_keypath,
            )

        # prepare data
        if stat.S_ISDIR(dmode):
            if args.data_dev:
                raise Error('data path is not a block device', args.data)
            prepare_dir(
                path=args.data,
                journal=journal_symlink,
                cluster_uuid=args.cluster_uuid,
                osd_uuid=args.osd_uuid,
                journal_uuid=journal_uuid,
                journal_dmcrypt=journal_dmcrypt,
                )
        elif stat.S_ISBLK(dmode):
            if args.data_dir:
                raise Error('data path is not a directory', args.data)
            prepare_dev(
                data=args.data,
                journal=journal_symlink,
                fstype=args.fs_type,
                mkfs_args=mkfs_args,
                mount_options=mount_options,
                cluster_uuid=args.cluster_uuid,
                osd_uuid=args.osd_uuid,
                journal_uuid=journal_uuid,
                journal_dmcrypt=journal_dmcrypt,
                osd_dm_keypath=osd_dm_keypath,
                )
        else:
            raise Error('not a dir or block device', args.data)
        prepare_lock.release()  # noqa

        if stat.S_ISBLK(dmode):
            # try to make sure the kernel refreshes the table.  note
            # that if this gets ebusy, we are probably racing with
            # udev because it already updated it.. ignore failure here.

            # On RHEL and CentOS distros, calling partprobe forces a reboot of
            # the server. Since we are not resizing partitons so we rely on
            # calling partx
            if platform_distro().startswith(('centos', 'red', 'scientific')):
                LOG.info('calling partx on prepared device %s', args.data)
                LOG.info('re-reading known partitions will display errors')

                command(
                    [
                        'partx',
                        '-a',
                        args.data,
                        ],
                    )

            else:
                LOG.debug('Calling partprobe on prepared device %s', args.data)
                command(
                    [
                        'partprobe',
                        args.data,
                        ],
                    )

    except Error as e:
        if journal_dm_keypath:
            os.unlink(journal_dm_keypath)
        if osd_dm_keypath:
            os.unlink(osd_dm_keypath)
        prepare_lock.release()  # noqa
        raise e


###########################


def mkfs(
    path,
    cluster,
    osd_id,
    fsid,
    keyring,
    ):
    monmap = os.path.join(path, 'activate.monmap')
    command_check_call(
        [
            'ceph',
            '--cluster', cluster,
            '--name', 'client.bootstrap-osd',
            '--keyring', keyring,
            'mon', 'getmap', '-o', monmap,
            ],
        )

    command_check_call(
        [
            'ceph-osd',
            '--cluster', cluster,
            '--mkfs',
            '--mkkey',
            '-i', osd_id,
            '--monmap', monmap,
            '--osd-data', path,
            '--osd-journal', os.path.join(path, 'journal'),
            '--osd-uuid', fsid,
            '--keyring', os.path.join(path, 'keyring'),
            ],
        )
    # TODO ceph-osd --mkfs removes the monmap file?
    # os.unlink(monmap)


def auth_key(
    path,
    cluster,
    osd_id,
    keyring,
    ):
    try:
        # try dumpling+ cap scheme
        command_check_call(
            [
                'ceph',
                '--cluster', cluster,
                '--name', 'client.bootstrap-osd',
                '--keyring', keyring,
                'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
                '-i', os.path.join(path, 'keyring'),
                'osd', 'allow *',
                'mon', 'allow profile osd',
                ],
            )
    except subprocess.CalledProcessError as err:
        if err.returncode == errno.EACCES:
            # try old cap scheme
            command_check_call(
                [
                    'ceph',
                    '--cluster', cluster,
                    '--name', 'client.bootstrap-osd',
                    '--keyring', keyring,
                    'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
                    '-i', os.path.join(path, 'keyring'),
                    'osd', 'allow *',
                    'mon', 'allow rwx',
                ],
                )
        else:
            raise


def move_mount(
    dev,
    path,
    cluster,
    osd_id,
    fstype,
    mount_options,
    ):
    LOG.debug('Moving mount to final location...')
    parent = STATEDIR + '/osd'
    osd_data = os.path.join(
        parent,
        '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id),
        )
    maybe_mkdir(osd_data)

    # pick best-of-breed mount options based on fs type
    if mount_options is None:
        mount_options = MOUNT_OPTIONS.get(fstype, '')

    # we really want to mount --move, but that is not supported when
    # the parent mount is shared, as it is by default on RH, Fedora,
    # and probably others.  Also, --bind doesn't properly manipulate
    # /etc/mtab, which *still* isn't a symlink to /proc/mounts despite
    # this being 2013.  Instead, mount the original device at the final
    # location.
    command_check_call(
        [
            '/bin/mount',
            '-o',
            mount_options,
            '--',
            dev,
            osd_data,
            ],
        )
    command_check_call(
        [
            '/bin/umount',
            '-l',   # lazy, in case someone else is peeking at the
                    # wrong moment
            '--',
            path,
            ],
        )


def start_daemon(
    cluster,
    osd_id,
    ):
    LOG.debug('Starting %s osd.%s...', cluster, osd_id)

    path = (STATEDIR + '/osd/{cluster}-{osd_id}').format(
        cluster=cluster, osd_id=osd_id)

    # upstart?
    try:
        if os.path.exists(os.path.join(path,'upstart')):
            command_check_call(
                [
                    '/sbin/initctl',
                    # use emit, not start, because start would fail if the
                    # instance was already running
                    'emit',
                    # since the daemon starting doesn't guarantee much about
                    # the service being operational anyway, don't bother
                    # waiting for it
                    '--no-wait',
                    '--',
                    'ceph-osd',
                    'cluster={cluster}'.format(cluster=cluster),
                    'id={osd_id}'.format(osd_id=osd_id),
                    ],
                )
        elif os.path.exists(os.path.join(path, 'sysvinit')):
            if os.path.exists('/usr/sbin/service'):
                svc = '/usr/sbin/service'
            else:
                svc = '/sbin/service'
            command_check_call(
                [
                    svc,
                    'ceph',
                    '--cluster',
                    '{cluster}'.format(cluster=cluster),
                    'start',
                    'osd.{osd_id}'.format(osd_id=osd_id),
                    ],
                )
        elif os.path.exists(os.path.join(path, 'systemd')):
            command_check_call(
                [
                    'systemctl',
                    'enable',
                    'ceph-osd@{osd_id}'.format(osd_id=osd_id),
                    ],
                )
            command_check_call(
                [
                    'systemctl',
                    'start',
                    'ceph-osd@{osd_id}'.format(osd_id=osd_id),
                    ],
                )
        else:
            raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format(
                cluster=cluster,
                osd_id=osd_id,
            ))
    except subprocess.CalledProcessError as e:
        raise Error('ceph osd start failed', e)


def detect_fstype(
    dev,
    ):
    fstype = _check_output(
        args=[
            '/sbin/blkid',
            # we don't want stale cached results
            '-p',
            '-s', 'TYPE',
            '-o' 'value',
            '--',
            dev,
            ],
        )
    fstype = must_be_one_line(fstype)
    return fstype


def mount_activate(
    dev,
    activate_key_template,
    init,
    ):

    try:
        fstype = detect_fstype(dev=dev)
    except (subprocess.CalledProcessError,
            TruncatedLineError,
            TooManyLinesError) as e:
        raise FilesystemTypeError(
            'device {dev}'.format(dev=dev),
            e,
            )

    # TODO always using mount options from cluster=ceph for
    # now; see http://tracker.newdream.net/issues/3253
    mount_options = get_conf(
        cluster='ceph',
        variable='osd_mount_options_{fstype}'.format(
            fstype=fstype,
            ),
        )

    if mount_options is None:
        mount_options = get_conf(
            cluster='ceph',
            variable='osd_fs_mount_options_{fstype}'.format(
                fstype=fstype,
                ),
            )

    #remove whitespaces from mount_options
    if mount_options is not None:
        mount_options = "".join(mount_options.split())

    path = mount(dev=dev, fstype=fstype, options=mount_options)

    osd_id = None
    cluster = None
    try:
        (osd_id, cluster) = activate(path, activate_key_template, init)

        # check if the disk is already active, or if something else is already
        # mounted there
        active = False
        other = False
        src_dev = os.stat(path).st_dev
        try:
            dst_dev = os.stat((STATEDIR + '/osd/{cluster}-{osd_id}').format(
                cluster=cluster,
                osd_id=osd_id)).st_dev
            if src_dev == dst_dev:
                active = True
            else:
                parent_dev = os.stat(STATEDIR + '/osd').st_dev
                if dst_dev != parent_dev:
                    other = True
                elif os.listdir((STATEDIR + '/osd/{cluster}-{osd_id}').format(
                        cluster=cluster,
                        osd_id=osd_id,
                        )):
                    other = True

        except OSError:
            pass

        if active:
            LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
            unmount(path)
        elif other:
            raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id))
        else:
            move_mount(
                dev=dev,
                path=path,
                cluster=cluster,
                osd_id=osd_id,
                fstype=fstype,
                mount_options=mount_options,
                )
        return (cluster, osd_id)

    except:
        LOG.error('Failed to activate')
        unmount(path)
        raise
    finally:
        # remove our temp dir
        if os.path.exists(path):
            os.rmdir(path)


def activate_dir(
    path,
    activate_key_template,
    init,
    ):

    if not os.path.exists(path):
        raise Error(
            'directory %s does not exist' % path
            )

    (osd_id, cluster) = activate(path, activate_key_template, init)

    if init not in (None, 'none' ):
        canonical = (STATEDIR + '/osd/{cluster}-{osd_id}').format(
            cluster=cluster,
            osd_id=osd_id)
        if path != canonical:
            # symlink it from the proper location
            create = True
            if os.path.lexists(canonical):
                old = os.readlink(canonical)
                if old != path:
                    LOG.debug('Removing old symlink %s -> %s', canonical, old)
                    try:
                        os.unlink(canonical)
                    except:
                        raise Error('unable to remove old symlink', canonical)
                else:
                    create = False
            if create:
                LOG.debug('Creating symlink %s -> %s', canonical, path)
                try:
                    os.symlink(path, canonical)
                except:
                    raise Error('unable to create symlink %s -> %s' % (canonical, path))

    return (cluster, osd_id)


def find_cluster_by_uuid(_uuid):
    """
    Find a cluster name by searching /etc/ceph/*.conf for a conf file
    with the right uuid.
    """
    _uuid = _uuid.lower()
    no_fsid = []
    if not os.path.exists(SYSCONFDIR):
        return None
    for conf_file in os.listdir(SYSCONFDIR):
        if not conf_file.endswith('.conf'):
            continue
        cluster = conf_file[:-5]
        try:
            fsid = get_fsid(cluster)
        except Error as e:
            if e.message != 'getting cluster uuid from configuration failed':
                raise e
            no_fsid.append(cluster)
        else:
            if fsid == _uuid:
                return cluster
    # be tolerant of /etc/ceph/ceph.conf without an fsid defined.
    if len(no_fsid) == 1 and no_fsid[0] == 'ceph':
        LOG.warning('No fsid defined in ' + SYSCONFDIR + '/ceph.conf; using anyway')
        return 'ceph'
    return None


def activate(
    path,
    activate_key_template,
    init,
    ):

    check_osd_magic(path)

    ceph_fsid = read_one_line(path, 'ceph_fsid')
    if ceph_fsid is None:
        raise Error('No cluster uuid assigned.')
    LOG.debug('Cluster uuid is %s', ceph_fsid)

    cluster = find_cluster_by_uuid(ceph_fsid)
    if cluster is None:
        raise Error('No cluster conf found in ' + SYSCONFDIR + ' with fsid %s' % ceph_fsid)
    LOG.debug('Cluster name is %s', cluster)

    fsid = read_one_line(path, 'fsid')
    if fsid is None:
        raise Error('No OSD uuid assigned.')
    LOG.debug('OSD uuid is %s', fsid)

    keyring = activate_key_template.format(cluster=cluster,
                                           statedir=STATEDIR)

    osd_id = get_osd_id(path)
    if osd_id is None:
        osd_id = allocate_osd_id(
            cluster=cluster,
            fsid=fsid,
            keyring=keyring,
            )
        write_one_line(path, 'whoami', osd_id)
    LOG.debug('OSD id is %s', osd_id)

    if not os.path.exists(os.path.join(path, 'ready')):
        LOG.debug('Initializing OSD...')
        # re-running mkfs is safe, so just run until it completes
        mkfs(
            path=path,
            cluster=cluster,
            osd_id=osd_id,
            fsid=fsid,
            keyring=keyring,
            )

    if init not in (None, 'none' ):
        if init == 'auto':
            conf_val = get_conf(
                cluster=cluster,
                variable='init'
                )
            if conf_val is not None:
                init = conf_val
            else:
                (distro, release, codename) = platform.dist()
                if distro == 'Ubuntu':
                    init = 'upstart'
                else:
                    init = 'sysvinit'

        LOG.debug('Marking with init system %s', init)
        with file(os.path.join(path, init), 'w'):
            pass

    # remove markers for others, just in case.
    for other in INIT_SYSTEMS:
        if other != init:
            try:
                os.unlink(os.path.join(path, other))
            except OSError:
                pass

    if not os.path.exists(os.path.join(path, 'active')):
        LOG.debug('Authorizing OSD key...')
        auth_key(
            path=path,
            cluster=cluster,
            osd_id=osd_id,
            keyring=keyring,
            )
        write_one_line(path, 'active', 'ok')
    LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
    return (osd_id, cluster)


def main_activate(args):
    cluster = None
    osd_id = None

    if not os.path.exists(args.path):
        raise Error('%s does not exist' % args.path)

    if is_suppressed(args.path):
        LOG.info('suppressed activate request on %s', args.path)
        return

    activate_lock.acquire()  # noqa
    try:
        mode = os.stat(args.path).st_mode
        if stat.S_ISBLK(mode):
            (cluster, osd_id) = mount_activate(
                dev=args.path,
                activate_key_template=args.activate_key_template,
                init=args.mark_init,
                )

        elif stat.S_ISDIR(mode):
            (cluster, osd_id) = activate_dir(
                path=args.path,
                activate_key_template=args.activate_key_template,
                init=args.mark_init,
                )

            if args.mark_init == 'none':
                command_check_call(
                    [
                        'ceph-osd',
                        '--cluster={cluster}'.format(cluster=cluster),
                        '--id={osd_id}'.format(osd_id=osd_id),
                        '--osd-data={path}'.format(path=args.path),
                        '--osd-journal={path}/journal'.format(path=args.path),
                    ],
                )

        else:
            raise Error('%s is not a directory or block device' % args.path)

        if args.mark_init not in (None, 'none' ):

            start_daemon(
                cluster=cluster,
                osd_id=osd_id,
            )

    finally:
        activate_lock.release()  # noqa


###########################

def get_journal_osd_uuid(path):
    if not os.path.exists(path):
        raise Error('%s does not exist' % path)

    mode = os.stat(path).st_mode
    if not stat.S_ISBLK(mode):
        raise Error('%s is not a block device' % path)

    try:
        out = _check_output(
            args=[
                'ceph-osd',
                '-i', '0',   # this is ignored
                '--get-journal-uuid',
                '--osd-journal',
                path,
                ],
            close_fds=True,
            )
    except subprocess.CalledProcessError as e:
        raise Error(
            'failed to get osd uuid/fsid from journal',
            e,
            )
    value = str(out).split('\n', 1)[0]
    LOG.debug('Journal %s has OSD UUID %s', path, value)
    return value


def main_activate_journal(args):
    if not os.path.exists(args.dev):
        raise Error('%s does not exist' % args.dev)

    cluster = None
    osd_id = None
    osd_uuid = None
    activate_lock.acquire()  # noqa
    try:
        osd_uuid = get_journal_osd_uuid(args.dev)
        path = os.path.join('/dev/disk/by-partuuid/', osd_uuid.lower())

        (cluster, osd_id) = mount_activate(
            dev=path,
            activate_key_template=args.activate_key_template,
            init=args.mark_init,
            )

        start_daemon(
            cluster=cluster,
            osd_id=osd_id,
            )

    finally:
        activate_lock.release()  # noqa


###########################


def main_activate_all(args):
    dir = '/dev/disk/by-parttypeuuid'
    LOG.debug('Scanning %s', dir)
    if not os.path.exists(dir):
        return
    err = False
    for name in os.listdir(dir):
        if name.find('.') < 0:
            continue
        (tag, uuid) = name.split('.')

        if tag == OSD_UUID or tag == DMCRYPT_OSD_UUID:

            if tag == DMCRYPT_OSD_UUID:
                path = os.path.join('/dev/mapper', uuid)
            else:
                path = os.path.join(dir, name)

            LOG.info('Activating %s', path)
            activate_lock.acquire()  # noqa
            try:
                (cluster, osd_id) = mount_activate(
                    dev=path,
                    activate_key_template=args.activate_key_template,
                    init=args.mark_init,
                    )
                start_daemon(
                    cluster=cluster,
                    osd_id=osd_id,
                    )

            except Exception as e:
                print >> sys.stderr, '{prog}: {msg}'.format(
                    prog=args.prog,
                    msg=e,
                    )
                err = True

            finally:
                activate_lock.release()  # noqa
    if err:
        raise Error('One or more partitions failed to activate')


###########################

def is_swap(dev):
    dev = os.path.realpath(dev)
    with file('/proc/swaps', 'rb') as proc_swaps:
        for line in proc_swaps.readlines()[1:]:
            fields = line.split()
            if len(fields) < 3:
                continue
            swaps_dev = fields[0]
            if swaps_dev.startswith('/') and os.path.exists(swaps_dev):
                swaps_dev = os.path.realpath(swaps_dev)
                if swaps_dev == dev:
                    return True
    return False


def get_oneliner(base, name):
    path = os.path.join(base, name)
    if os.path.isfile(path):
        with open(path, 'r') as _file:
            return _file.readline().rstrip()
    return None


def get_dev_fs(dev):
    fscheck, _ = command(
        [
            'blkid',
            '-s',
            'TYPE',
            dev,
        ],
    )
    if 'TYPE' in fscheck:
        fstype = fscheck.split()[1].split('"')[1]
        return fstype
    else:
        return None


def get_partition_type(part):
    """
    Get the GPT partition type UUID.  If we have an old blkid and can't
    get it that way, use sgdisk and use the description instead (and hope
    dmcrypt isn't being used).
    """
    blkid, _ = command(
        [
            'blkid',
            '-p',
            '-o', 'udev',
            part,
        ]
    )
    saw_part_entry = False
    for line in blkid.splitlines():
        (key, value) = line.split('=')
        if key == 'ID_PART_ENTRY_TYPE':
            return value
        if key == 'ID_PART_ENTRY_SCHEME':
            table_type = value
        if key.startswith('ID_PART_ENTRY_'):
            saw_part_entry = True

    # hmm, is it in fact GPT?
    table_type = None
    base = get_partition_base(part)
    blkid, _ = command(
        [
            'blkid',
            '-p',
            '-o', 'udev',
            base
        ]
    )
    for line in blkid.splitlines():
        (key, value) = line.split('=')
        if key == 'ID_PART_TABLE_TYPE':
            table_type = value
    if table_type != 'gpt':
        return None    # not even GPT

    if saw_part_entry:
        return None    # GPT, and blkid appears to be new, so we're done.

    # bah, fall back to sgdisk.
    if 'blkid' not in warned_about:
        LOG.warning('Old blkid does not support ID_PART_ENTRY_* fields, trying sgdisk; may not correctly identify ceph volumes with dmcrypt')
        warned_about['blkid'] = True
    (base, partnum) = re.match('(\D+)(\d+)', part).group(1, 2)
    sgdisk, _ = command(
        [
            'sgdisk',
            '-p',
            base,
        ]
    )

    for line in sgdisk.splitlines():
        m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line)
        if m is not None:
            num = m.group(1)
            if num != partnum:
                continue
            desc = m.group(2)
            # assume unencrypted ... blkid has failed us :(
            if desc == 'ceph data':
                return OSD_UUID
            if desc == 'ceph journal':
                return JOURNAL_UUID

    return None


def get_partition_uuid(dev):
    (base, partnum) = re.match('(\D+)(\d+)', dev).group(1, 2)
    out, _ = command(['sgdisk', '-i', partnum, base])
    for line in out.splitlines():
        m = re.match('Partition unique GUID: (\S+)', line)
        if m:
            return m.group(1).lower()
    return None


def more_osd_info(path, uuid_map):
    desc = []
    ceph_fsid = get_oneliner(path, 'ceph_fsid')
    if ceph_fsid:
        cluster = find_cluster_by_uuid(ceph_fsid)
        if cluster:
            desc.append('cluster ' + cluster)
        else:
            desc.append('unknown cluster ' + ceph_fsid)

    who = get_oneliner(path, 'whoami')
    if who:
        desc.append('osd.%s' % who)

    journal_uuid = get_oneliner(path, 'journal_uuid')
    if journal_uuid:
        journal_uuid = journal_uuid.lower()
        if journal_uuid in uuid_map:
            desc.append('journal %s' % uuid_map[journal_uuid])

    return desc

def list_dev_osd(dev, uuid_map):
    path = is_mounted(dev)
    fs_type = get_dev_fs(dev)
    desc = []
    if path:
        desc.append('active')
        desc.extend(more_osd_info(path, uuid_map))
    elif fs_type:
        try:
            tpath = mount(dev=dev, fstype=fs_type, options='')
            if tpath:
                try:
                    magic = get_oneliner(tpath, 'magic')
                    if magic is not None:
                        desc.append('prepared')
                        desc.extend(more_osd_info(tpath, uuid_map))
                finally:
                    unmount(tpath)
        except MountError:
            pass
    return desc

def list_dev(dev, uuid_map, journal_map):
    ptype = 'unknown'
    prefix = ''
    if is_partition(dev):
        ptype = get_partition_type(dev)
        prefix = ' '

    desc = []
    if ptype == OSD_UUID:
        desc = list_dev_osd(dev, uuid_map)
        if desc:
            desc = ['ceph data'] + desc
        else:
            desc = ['ceph data', 'unprepared']
    elif ptype == DMCRYPT_OSD_UUID:
        holders = is_held(dev)
        if not holders:
            desc = ['ceph data (dmcrypt)', 'not currently mapped']
        elif len(holders) == 1:
            holder = '/dev/' + holders[0]
            fs_desc = list_dev_osd(holder, uuid_map)
            desc = ['ceph data (dmcrypt %s)' % holder] + fs_desc
        else:
            desc = ['ceph data (dmcrypt)', 'holders: ' + ','.join(holders)]
    elif ptype == JOURNAL_UUID:
        desc.append('ceph journal')
        part_uuid = get_partition_uuid(dev)
        if part_uuid and part_uuid in journal_map:
            desc.append('for %s' % journal_map[part_uuid])
    elif ptype == DMCRYPT_JOURNAL_UUID:
        holders = is_held(dev)
        if len(holders) == 1:
            desc = ['ceph journal (dmcrypt /dev/%s)' % holders[0]]
        else:
            desc = ['ceph journal (dmcrypt)']
        part_uuid = get_partition_uuid(dev)
        if part_uuid and part_uuid in journal_map:
            desc.append('for %s' % journal_map[part_uuid])
    else:
        path = is_mounted(dev)
        fs_type = get_dev_fs(dev)
        if is_swap(dev):
            desc.append('swap')
        else:
            desc.append('other')
        if fs_type:
            desc.append(fs_type)
        elif ptype:
            desc.append(ptype)
        if path:
            desc.append('mounted on %s' % path)

    print '%s%s %s' % (prefix, dev, ', '.join(desc))


def main_list(args):
    partmap = list_all_partitions()

    uuid_map = {}
    journal_map = {}
    for base, parts in sorted(partmap.iteritems()):
        for p in parts:
            dev = get_dev_path(p)
            part_uuid = get_partition_uuid(dev)
            if part_uuid:
                uuid_map[part_uuid] = dev
            ptype = get_partition_type(dev)
            if ptype == OSD_UUID:
                fs_type = get_dev_fs(dev)
                if fs_type is not None:
                    try:
                        tpath = mount(dev=dev, fstype=fs_type, options='')
                        try:
                            journal_uuid = get_oneliner(tpath, 'journal_uuid')
                            if journal_uuid:
                                journal_map[journal_uuid.lower()] = dev
                        finally:
                            unmount(tpath)
                    except MountError:
                        pass
            if ptype == DMCRYPT_OSD_UUID:
                holders = is_held(dev)
                if len(holders) == 1:
                    holder = '/dev/' + holders[0]
                    fs_type = get_dev_fs(holder)
                    if fs_type is not None:
                        try:
                            tpath = mount(dev=holder, fstype=fs_type, options='')
                            try:
                                journal_uuid = get_oneliner(tpath, 'journal_uuid')
                                if journal_uuid:
                                    journal_map[journal_uuid.lower()] = dev
                            finally:
                                unmount(tpath)
                        except MountError:
                            pass

    for base, parts in sorted(partmap.iteritems()):
        if parts:
            print '%s :' % get_dev_path(base)
            for p in sorted(parts):
                list_dev(get_dev_path(p), uuid_map, journal_map)
        else:
            list_dev(get_dev_path(base), uuid_map, journal_map)


###########################
#
# Mark devices that we want to suppress activates on with a
# file like
#
#  /var/lib/ceph/tmp/suppress-activate.sdb
#
# where the last bit is the sanitized device name (/dev/X without the
# /dev/ prefix) and the is_suppress() check matches a prefix.  That
# means suppressing sdb will stop activate on sdb1, sdb2, etc.
#

def is_suppressed(path):
    disk = os.path.realpath(path)
    try:
        if not disk.startswith('/dev/') or not stat.S_ISBLK(os.lstat(path).st_mode):
            return False
        base = get_dev_name(disk)
        while len(base):
            if os.path.exists(SUPPRESS_PREFIX + base):  # noqa
                return True
            base = base[:-1]
    except:
        return False


def set_suppress(path):
    disk = os.path.realpath(path)
    if not os.path.exists(disk):
        raise Error('does not exist', path)
    if not stat.S_ISBLK(os.lstat(path).st_mode):
        raise Error('not a block device', path)
    base = get_dev_name(disk)

    with file(SUPPRESS_PREFIX + base, 'w') as f:  # noqa
        pass
    LOG.info('set suppress flag on %s', base)


def unset_suppress(path):
    disk = os.path.realpath(path)
    if not os.path.exists(disk):
        raise Error('does not exist', path)
    if not stat.S_ISBLK(os.lstat(path).st_mode):
        raise Error('not a block device', path)
    assert disk.startswith('/dev/')
    base = get_dev_name(disk)

    fn = SUPPRESS_PREFIX + base  # noqa
    if not os.path.exists(fn):
        raise Error('not marked as suppressed', path)

    try:
        os.unlink(fn)
        LOG.info('unset suppress flag on %s', base)
    except OSError as e:
        raise Error('failed to unsuppress', e)


def main_suppress(args):
    set_suppress(args.path)


def main_unsuppress(args):
    unset_suppress(args.path)


def main_zap(args):
    for dev in args.dev:
        zap(dev)

###########################


def setup_statedir(dir):
    # XXX The following use of globals makes linting
    # really hard. Global state in Python is iffy and
    # should be avoided.
    global STATEDIR
    STATEDIR = dir

    if not os.path.exists(STATEDIR):
        os.mkdir(STATEDIR)
    if not os.path.exists(STATEDIR + "/tmp"):
        os.mkdir(STATEDIR + "/tmp")

    global prepare_lock
    prepare_lock = filelock(STATEDIR + '/tmp/ceph-disk.prepare.lock')

    global activate_lock
    activate_lock = filelock(STATEDIR + '/tmp/ceph-disk.activate.lock')

    global SUPPRESS_PREFIX
    SUPPRESS_PREFIX = STATEDIR + '/tmp/suppress-activate.'


def setup_sysconfdir(dir):
    global SYSCONFDIR
    SYSCONFDIR = dir


def parse_args():
    parser = argparse.ArgumentParser(
        'ceph-disk',
        )
    parser.add_argument(
        '-v', '--verbose',
        action='store_true', default=None,
        help='be more verbose',
        )
    parser.add_argument(
        '--prepend-to-path',
        metavar='PATH',
        default='/usr/bin',
        help='prepend PATH to $PATH for backward compatibility (default /usr/bin)',
        )
    parser.add_argument(
        '--statedir',
        metavar='PATH',
        default='/var/lib/ceph',
        help='directory in which ceph state is preserved (default /var/lib/ceph)',
        )
    parser.add_argument(
        '--sysconfdir',
        metavar='PATH',
        default='/etc/ceph',
        help='directory in which ceph configuration files are found (default /etc/ceph)',
        )
    parser.set_defaults(
        # we want to hold on to this, for later
        prog=parser.prog,
        cluster='ceph',
        )

    subparsers = parser.add_subparsers(
        title='subcommands',
        description='valid subcommands',
        help='sub-command help',
        )

    prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD')
    prepare_parser.add_argument(
        '--cluster',
        metavar='NAME',
        help='cluster name to assign this disk to',
        )
    prepare_parser.add_argument(
        '--cluster-uuid',
        metavar='UUID',
        help='cluster uuid to assign this disk to',
        )
    prepare_parser.add_argument(
        '--osd-uuid',
        metavar='UUID',
        help='unique OSD uuid to assign this disk to',
        )
    prepare_parser.add_argument(
        '--journal-uuid',
        metavar='UUID',
        help='unique uuid to assign to the journal',
        )
    prepare_parser.add_argument(
        '--fs-type',
        help='file system type to use (e.g. "ext4")',
        )
    prepare_parser.add_argument(
        '--zap-disk',
        action='store_true', default=None,
        help='destroy the partition table (and content) of a disk',
        )
    prepare_parser.add_argument(
        '--data-dir',
        action='store_true', default=None,
        help='verify that DATA is a dir',
        )
    prepare_parser.add_argument(
        '--data-dev',
        action='store_true', default=None,
        help='verify that DATA is a block device',
        )
    prepare_parser.add_argument(
        '--journal-file',
        action='store_true', default=None,
        help='verify that JOURNAL is a file',
        )
    prepare_parser.add_argument(
        '--journal-dev',
        action='store_true', default=None,
        help='verify that JOURNAL is a block device',
        )
    prepare_parser.add_argument(
        '--dmcrypt',
        action='store_true', default=None,
        help='encrypt DATA and/or JOURNAL devices with dm-crypt',
        )
    prepare_parser.add_argument(
        '--dmcrypt-key-dir',
        metavar='KEYDIR',
        default='/etc/ceph/dmcrypt-keys',
        help='directory where dm-crypt keys are stored',
        )
    prepare_parser.add_argument(
        'data',
        metavar='DATA',
        help='path to OSD data (a disk block device or directory)',
        )
    prepare_parser.add_argument(
        'journal',
        metavar='JOURNAL',
        nargs='?',
        help=('path to OSD journal disk block device;'
              + ' leave out to store journal in file'),
        )
    prepare_parser.set_defaults(
        func=main_prepare,
        )

    activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD')
    activate_parser.add_argument(
        '--mount',
        action='store_true', default=None,
        help='mount a block device [deprecated, ignored]',
        )
    activate_parser.add_argument(
        '--activate-key',
        metavar='PATH',
        help='bootstrap-osd keyring path template (%(default)s)',
        dest='activate_key_template',
        )
    activate_parser.add_argument(
        '--mark-init',
        metavar='INITSYSTEM',
        help='init system to manage this dir',
        default='auto',
        choices=INIT_SYSTEMS,
        )
    activate_parser.add_argument(
        'path',
        metavar='PATH',
        nargs='?',
        help='path to block device or directory',
        )
    activate_parser.set_defaults(
        activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
        func=main_activate,
        )

    activate_journal_parser = subparsers.add_parser('activate-journal', help='Activate an OSD via its journal device')
    activate_journal_parser.add_argument(
        'dev',
        metavar='DEV',
        help='path to journal block device',
        )
    activate_journal_parser.add_argument(
        '--activate-key',
        metavar='PATH',
        help='bootstrap-osd keyring path template (%(default)s)',
        dest='activate_key_template',
        )
    activate_journal_parser.add_argument(
        '--mark-init',
        metavar='INITSYSTEM',
        help='init system to manage this dir',
        default='auto',
        choices=INIT_SYSTEMS,
        )
    activate_journal_parser.set_defaults(
        activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
        func=main_activate_journal,
        )

    activate_all_parser = subparsers.add_parser('activate-all', help='Activate all tagged OSD partitions')
    activate_all_parser.add_argument(
        '--activate-key',
        metavar='PATH',
        help='bootstrap-osd keyring path template (%(default)s)',
        dest='activate_key_template',
        )
    activate_all_parser.add_argument(
        '--mark-init',
        metavar='INITSYSTEM',
        help='init system to manage this dir',
        default='auto',
        choices=INIT_SYSTEMS,
        )
    activate_all_parser.set_defaults(
        activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
        func=main_activate_all,
        )

    list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs')
    list_parser.set_defaults(
        func=main_list,
        )

    suppress_parser = subparsers.add_parser('suppress-activate', help='Suppress activate on a device (prefix)')
    suppress_parser.add_argument(
        'path',
        metavar='PATH',
        nargs='?',
        help='path to block device or directory',
        )
    suppress_parser.set_defaults(
        func=main_suppress,
        )

    unsuppress_parser = subparsers.add_parser('unsuppress-activate', help='Stop suppressing activate on a device (prefix)')
    unsuppress_parser.add_argument(
        'path',
        metavar='PATH',
        nargs='?',
        help='path to block device or directory',
        )
    unsuppress_parser.set_defaults(
        func=main_unsuppress,
        )

    zap_parser = subparsers.add_parser('zap', help='Zap/erase/destroy a device\'s partition table (and contents)')
    zap_parser.add_argument(
        'dev',
        metavar='DEV',
        nargs='+',
        help='path to block device',
        )
    zap_parser.set_defaults(
        func=main_zap,
        )

    args = parser.parse_args()
    return args


def main():
    args = parse_args()

    loglevel = logging.WARNING
    if args.verbose:
        loglevel = logging.DEBUG

    logging.basicConfig(
        level=loglevel,
        )

    if args.prepend_to_path != '':
        path = os.environ.get('PATH', os.defpath)
        os.environ['PATH'] = args.prepend_to_path + ":" + path

    setup_statedir(args.statedir)
    setup_sysconfdir(args.sysconfdir)

    try:
        args.func(args)

    except Error as e:
        raise SystemExit(
            '{prog}: {msg}'.format(
                prog=args.prog,
                msg=e,
            )
        )

    except CephDiskException as error:
        exc_name = error.__class__.__name__
        raise SystemExit(
            '{prog} {exc_name}: {msg}'.format(
                prog=args.prog,
                exc_name=exc_name,
                msg=error,
            )
        )


if __name__ == '__main__':
    main()
    warned_about = {}
